#!/usr/bin/env python3

from multiprocessing.dummy import Pool

import pytest

from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry

cluster = ClickHouseCluster(__file__)

node1 = cluster.add_instance("node1", with_zookeeper=True)
node2 = cluster.add_instance("node2", with_zookeeper=True)
node3 = cluster.add_instance("node3", with_zookeeper=True)


@pytest.fixture(scope="module")
def started_cluster():
    global cluster
    try:
        cluster.start()
        yield cluster

    finally:
        cluster.shutdown()


def test_parallel_quorum_actually_parallel(started_cluster):
    settings = {
        "insert_quorum": "3",
        "insert_quorum_parallel": "1",
        "function_sleep_max_microseconds_per_block": "0",
    }
    for i, node in enumerate([node1, node2, node3]):
        node.query(
            "CREATE TABLE r (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/r', '{num}') ORDER BY tuple()".format(
                num=i
            )
        )

    p = Pool(10)

    def long_insert(node):
        node.query(
            "INSERT INTO r SELECT number, toString(number) FROM numbers(5) where sleepEachRow(1) == 0",
            settings=settings,
        )

    job = p.apply_async(long_insert, (node1,))

    node2.query("INSERT INTO r VALUES (6, '6')", settings=settings)
    assert node1.query("SELECT COUNT() FROM r") == "1\n"
    assert node2.query("SELECT COUNT() FROM r") == "1\n"
    assert node3.query("SELECT COUNT() FROM r") == "1\n"

    node1.query("INSERT INTO r VALUES (7, '7')", settings=settings)
    assert node1.query("SELECT COUNT() FROM r") == "2\n"
    assert node2.query("SELECT COUNT() FROM r") == "2\n"
    assert node3.query("SELECT COUNT() FROM r") == "2\n"

    job.get()

    assert node1.query("SELECT COUNT() FROM r") == "7\n"
    assert node2.query("SELECT COUNT() FROM r") == "7\n"
    assert node3.query("SELECT COUNT() FROM r") == "7\n"
    p.close()
    p.join()


def test_parallel_quorum_actually_quorum(started_cluster):
    for i, node in enumerate([node1, node2, node3]):
        node.query(
            "CREATE TABLE q (a UInt64, b String) ENGINE=ReplicatedMergeTree('/test/q', '{num}') ORDER BY tuple()".format(
                num=i
            )
        )

    with PartitionManager() as pm:
        pm.partition_instances(node2, node1, port=9009)
        pm.partition_instances(node2, node3, port=9009)
        with pytest.raises(QueryRuntimeException):
            node1.query(
                "INSERT INTO q VALUES(1, 'Hello')",
                settings={
                    "insert_quorum": "3",
                    "insert_quorum_parallel": "1",
                    "insert_quorum_timeout": "3000",
                },
            )

        assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "1")
        assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "0")
        assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "1")

        node1.query(
            "INSERT INTO q VALUES(2, 'wlrd')",
            settings={
                "insert_quorum": "2",
                "insert_quorum_parallel": "1",
                "insert_quorum_timeout": "3000",
            },
        )

        assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "2")
        assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "0")
        assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "2")

        def insert_value_to_node(node, settings):
            node.query("INSERT INTO q VALUES(3, 'Hi')", settings=settings)

        def insert_fail_quorum_timeout(node, settings):
            if "insert_quorum_timeout" not in settings:
                settings["insert_quorum_timeout"] = "1000"
            error = node.query_and_get_error(
                "INSERT INTO q VALUES(3, 'Hi')", settings=settings
            )
            assert (
                "DB::Exception: Unknown quorum status. The data was inserted in the local replica but we could not verify quorum. Reason: Timeout while waiting for quorum"
                in error
            ), error

        p = Pool(2)
        res = p.apply_async(
            insert_fail_quorum_timeout,
            (
                node1,
                {
                    "insert_quorum": "3",
                    "insert_quorum_parallel": "1",
                    "insert_quorum_timeout": "1000",
                },
            ),
        )

        assert_eq_with_retry(
            node1,
            "SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1",
            "3",
        )
        assert_eq_with_retry(
            node3,
            "SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1",
            "3",
        )
        assert_eq_with_retry(
            node2,
            "SELECT COUNT() FROM system.parts WHERE table == 'q' and active == 1",
            "0",
        )

        # Insert to the second to satisfy quorum
        insert_fail_quorum_timeout(
            node2,
            {
                "insert_quorum": "3",
                "insert_quorum_parallel": "1",
                "insert_quorum_timeout": "1000",
            },
        )

        res.get()

        assert_eq_with_retry(node1, "SELECT COUNT() FROM q", "3")
        assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "0")
        assert_eq_with_retry(node3, "SELECT COUNT() FROM q", "3")

        p.close()
        p.join()

    node2.query("SYSTEM SYNC REPLICA q", timeout=10)
    assert_eq_with_retry(node2, "SELECT COUNT() FROM q", "3")
